package com.k2de.consumer.impl;

import com.k2data.common.config.InitConfig;
import com.k2data.k2de.utils.CheckAssetsUtil;
import com.k2data.platform.ddm.k2db.realtime.data.common.util.MethodTimeRecorderUtil;
import com.k2data.platform.ddm.sdk.builder.KMXRecord;
import com.k2de.consumer.config.InitConfigForConsumer;
import com.k2de.consumer.inter.KafkaSenderInter;
import com.k2de.consumer.inter.MessageConsumerInter;
import com.k2de.consumer.inter.MessageToRecordConverterInter;
import com.k2de.consumer.result.KmxRecordResult;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
import kafka.serializer.StringDecoder;
import kafka.utils.VerifiableProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

/**
 * kafka消费者, 消费完之后将转化后的message发往kmx.
 *
 * Created by chenjingshuai on 18-1-9.
 */
public abstract class KafkaConsumerToKMX extends KafkaConsumer implements MessageConsumerInter {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumerToKMX.class);

    private KafkaSenderInter<KMXRecord> kmxSender = new KMXSenderImpl();
    private KafkaSenderInter<String> exceptionRegionSender = new ExceptionRegionSenderImpl();

    private String sdmApiUrl;

    public KafkaConsumerToKMX(String zkUrl, String kafkaServerUrl, String sdmApiUrl) {
        this(zkUrl, kafkaServerUrl, null, InitConfigForConsumer.CONSUMER_GROUP_NAME,
                InitConfigForConsumer.OFFSET_WHEN_OFFSET_INFO_IS_ABSENT_IN_ZK, sdmApiUrl);
    }

    public KafkaConsumerToKMX(String zkUrl, String kafkaServerUrl, String consumedKafkaKeyPrefix, String groupName,
                              String offsetWhenOffsetInfoIsAbsentInZk, String sdmApiUrl) {
        super(zkUrl, consumedKafkaKeyPrefix, groupName, offsetWhenOffsetInfoIsAbsentInZk);

        kmxSender.init(kafkaServerUrl);
        exceptionRegionSender.init(kafkaServerUrl);
        if (InitConfig.WHETHER_TO_RECORD_METHOD_TIME) {
            kmxSender = MethodTimeRecorderUtil.getProxyThatRecordsMethodTime(kmxSender, KafkaSenderInter.class);
            exceptionRegionSender = MethodTimeRecorderUtil.
                    getProxyThatRecordsMethodTime(exceptionRegionSender, KafkaSenderInter.class);
        }

        this.sdmApiUrl = sdmApiUrl;

        // Cache all assets.
        CheckAssetsUtil.getAllAssets(sdmApiUrl);
    }

    @Override
    public void sendKmxRecordResult(KmxRecordResult kmxRecordResult) {
        boolean isSuc = false;
        if (kmxRecordResult.isSuc()) { // 消息正确~
            String fieldGroupId = kmxRecordResult.getFieldGroup();

            // 注册Asset
            isSuc = CheckAssetsUtil.checkAndAddAssetWhenItIsNonExistent(fieldGroupId,
                    kmxRecordResult.getCompoundId(), sdmApiUrl, kmxRecordResult.getCompoundIdDescription());
        }

        if (isSuc) {
            kmxSender.send(kmxRecordResult.getKmxRecord());
        } else {
            exceptionRegionSender.send(kmxRecordResult.getOriginalMessage());
        }
    }

}
